PoseNet Pythonで棒人間化した動画をKinesis Video Streamsに配信してみた
CX事業本部@大阪の岩田です。以前このツイートを見た時に気になっていたPoseNetを使って、Macのカメラで撮影した動画の棒人間化に挑戦してみました。
超便利なの作ってしまった。なにかと棒人間のgifが欲しいことがあるんだけど、自前でアニメーション作るのは大変。なのでウェブカメラで撮影した動画から棒人間gifを自動生成できるサイトを作った。PoseNetを使っているのでブラウザ上で動きます。遊んでみてください!: https://t.co/z6LtZbsB1P pic.twitter.com/ckPMe28URF
— Ken Kawamoto(ガリのほう) (@kenkawakenkenke) March 16, 2021
単に棒人間化する記事は以前Zennに投稿しているので、棒人間化した動画をKinesis Video Streamsに配信するところまで実装してみます。
やること
今回やることはざっくり以下の通りです
- Mac付属のカメラで動画を撮影する
- 撮影した動画はPoseNet Pythonで棒人間化する
- 棒人間化した動画はKinesis Video Streamsに配信する
- Kinesis Video Streamsへの配信にはGStreamerを使わない
- これを全部Pythonで実装する
- つまりKinesis Video Streams プロデューサーライブラリ無しの実装に挑戦します
- 以下ブログのさらに改良版に挑戦する形になります
- 【Kinesis Video Streams】Pythonで動画ファイルを送信する
環境
今回利用した環境です
- OS Mac OS X 10.15
- Python: 3.7.8
- av: 8.0.3
- boto3: 1.17.98
- botocore: 1.20.98
- numpy: 1.21.0
- opencv-python: 4.5.2.54
- posenet-python
- PyYAML: 5.4.1
- scipy: 1.7.0
- tensorflow: 1.13.1
事前準備
今回動画の棒人間化にはPoseNetのPython実装であるPoseNet Pythonを利用します。まずはPoseNet Pythonのリポジトリをクローンします
$ git clone https://github.com/rwightman/posenet-python Cloning into 'posenet-python'... remote: Enumerating objects: 119, done. remote: Total 119 (delta 0), reused 0 (delta 0), pack-reused 119 Receiving objects: 100% (119/119), 36.56 KiB | 4.57 MiB/s, done. Resolving deltas: 100% (68/68), done.
続いて仮想環境を作成し、必要な依存ライブラリをインストールします。
$ cd posenet-python/ $ python -m venv .venv $ source .venv/bin/activate $ pip install numpy opencv-python scipy tensorflow==1.13.1 pyyaml Collecting numpy Using cached numpy-1.21.0-cp37-cp37m-macosx_10_9_x86_64.whl (16.8 MB) Collecting opencv-python Using cached opencv_python-4.5.2.54-cp37-cp37m-macosx_10_15_x86_64.whl (43.7 MB) ...略
ここまで準備できたらリポジトリに含まれているサンプルコードが動作することを確認します。
$ python webcam_demo.py
ここまでで棒人間化の準備が整いました。続いてKinesis Video Streamsに動画を配信するための準備をしていきます。Kinesis Video Streamのエンドポイント取得とSIGV4の署名を作成するためにboto3を、カメラで撮影した画像からMKVファイルを作成するためにPyAVをインストールします
$ pip install boto3 av
これで実装の準備が整いました。今回詳細は割愛しますが
- Kinesis Video Streamsのストリーム作成
- 作成したストリームにPutMedia可能なクレデンシャル情報
も準備しておいて下さい。
実装
ここからは実際に棒人間の動画をKinesis Video Streamsに配信するロジックを実装します。先程Posenet Pythonの動作確認に使用したサンプルファイルwebcam_demo.py
を流用しつつ実装していきます。最終的なコードは以下のようになりました。エラーハンドリングなどは省略していますし、関数の分け方等もあまりキレイではないので、もし参考にされる場合はあくまで参考程度として下さい。
また、前提としてKinesis Video Streamsのストリームは事前作成済み、ストリームに書き込むためのクレデンシャル情報はセットアップ済みとします。
import argparse import concurrent.futures import time import io import av import boto3 import botocore from botocore.auth import SigV4Auth import cv2 import hashlib import tensorflow as tf import numpy as np import posenet class gen_request_parameters: def __init__(self, data): self._data = data self._pointer = 0 self._size = len(self._data) def __iter__(self): return self def __next__(self): if self._pointer >= self._size: raise StopIteration # signals "the end" left = self._size - self._pointer chunksz = 16000 if left < chunksz: chunksz = left pointer_start = self._pointer self._pointer += chunksz return self._data[pointer_start:self._pointer] def get_data_endpoint(stream_name, region): kvs_client = boto3.client('kinesisvideo', region_name=region) res = kvs_client.get_data_endpoint(StreamName=stream_name, APIName='PUT_MEDIA') return res['DataEndpoint'] def put_media(stream_name, url, start_ts, data): sha256_digest = hashlib.sha256(data).hexdigest() headers= { 'x-amzn-stream-name': stream_name, 'x-amzn-fragment-timecode-type': 'RELATIVE', 'x-amzn-producer-start-timestamp': str(int(start_ts)), 'x-amz-content-sha256': sha256_digest } request = botocore.awsrequest.AWSRequest(method='POST', url=url, headers=headers, data=gen_request_parameters(data)) credentials = boto3.Session().get_credentials() service = 'kinesisvideo' region = 'ap-northeast-1' botocore.auth.SigV4Auth(credentials, service, region).add_auth(request) res = botocore.httpsession.URLLib3Session().send(request.prepare()) print(res.headers) parser = argparse.ArgumentParser() parser.add_argument('--model', type=int, default=101) parser.add_argument('--cam_width', type=int, default=1280) parser.add_argument('--cam_height', type=int, default=720) parser.add_argument('--scale_factor', type=float, default=0.7125) args = parser.parse_args() def main(): stream_name = <事前に作成したKVSのストリーム名> region = 'ap-northeast-1' data_endpoint = get_data_endpoint(stream_name, region) url = f'{data_endpoint}/putMedia' with tf.Session() as sess, concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor: model_cfg, model_outputs = posenet.load_model(args.model, sess) output_stride = model_cfg['output_stride'] cap = cv2.VideoCapture(0) cap.set(3, args.cam_width) cap.set(4, args.cam_height) cap_fps = 10 while True: out_buffer = io.BytesIO() container = av.open(out_buffer, 'w', 'matroska') stream = container.add_stream('h264', rate=cap_fps) stream.width = args.cam_width stream.height = args.cam_height stream.pix_fmt = 'yuv420p' i = 0 start_ts = time.time() while i < (cap_fps * 9): input_image, display_image, output_scale = posenet.read_cap( cap, scale_factor=args.scale_factor, output_stride=output_stride) heatmaps_result, offsets_result, displacement_fwd_result, displacement_bwd_result = sess.run( model_outputs, feed_dict={'image:0': input_image} ) pose_scores, keypoint_scores, keypoint_coords = posenet.decode_multi.decode_multiple_poses( heatmaps_result.squeeze(axis=0), offsets_result.squeeze(axis=0), displacement_fwd_result.squeeze(axis=0), displacement_bwd_result.squeeze(axis=0), output_stride=output_stride, max_pose_detections=10, min_pose_score=0.15) keypoint_coords *= output_scale skeleton_img = np.zeros(display_image.shape, dtype=np.uint8) skeleton_img = posenet.draw_skeleton( skeleton_img, pose_scores, keypoint_scores, keypoint_coords, min_pose_confidence=0.02, min_part_confidence=0.02) frame = av.VideoFrame.from_ndarray(skeleton_img, format='rgb24') for packet in stream.encode(frame): container.mux(packet) i += 1 cv2.imshow('posenet', skeleton_img) if cv2.waitKey(1) & 0xFF == ord('q'): break # Flush stream for packet in stream.encode(): container.mux(packet) container.close() out_buffer.seek(0) data = out_buffer.read() executor.submit(put_media, stream_name, url, start_ts, data) if __name__ == "__main__": main()
1つづつコードを解説していきます。まずはgen_request_parameters
です。
class gen_request_parameters: def __init__(self, data): self._data = data self._pointer = 0 self._size = len(self._data) def __iter__(self): return self def __next__(self): if self._pointer >= self._size: raise StopIteration # signals "the end" left = self._size - self._pointer chunksz = 16000 if left < chunksz: chunksz = left pointer_start = self._pointer self._pointer += chunksz return self._data[pointer_start:self._pointer]
こちらは後ほどKinesis Video Streamのエンドポイントに送信する動画のバイナリデータをイテレータオブジェクトにラップするためのクラスです。こちらのコードをほぼそのまま流用させて頂きました。
https://coderoad.ru/59481174/Amazon-AWS-Kinesis-Video-Boto-GetMedia-PutMedia
続いて get_data_endpoint
def get_data_endpoint(stream_name, region): kvs_client = boto3.client('kinesisvideo', region_name=region) res = kvs_client.get_data_endpoint(StreamName=stream_name, APIName='PUT_MEDIA') return res['DataEndpoint']
Kinesis Video Streamのエンドポイントを取得するためのシンプルな関数です
次は put_media
です。こちらはKinesis Video StreamのエンドポイントにPutMediaするための処理です。
def put_media(stream_name, url, start_ts, data): sha256_digest = hashlib.sha256(data).hexdigest() headers= { 'x-amzn-stream-name': stream_name, 'x-amzn-fragment-timecode-type': 'RELATIVE', 'x-amzn-producer-start-timestamp': str(int(start_ts)), 'x-amz-content-sha256': sha256_digest } request = botocore.awsrequest.AWSRequest(method='POST', url=url, headers=headers, data=gen_request_parameters(data)) credentials = boto3.Session().get_credentials() service = 'kinesisvideo' region = 'ap-northeast-1' botocore.auth.SigV4Auth(credentials, service, region).add_auth(request) res = botocore.httpsession.URLLib3Session().send(request.prepare()) print(res.headers)
PutMediaにはSIGV4の署名が必要になるので、SigV4Authクラスのadd_authで署名を作成&付与しています。署名を作成するためのクレデンシャル情報はboto3.Session().get_credentials()
を利用すると簡単に取得できます。以下の部分です
request = botocore.awsrequest.AWSRequest(method='POST', url=url, headers=headers, data=gen_request_parameters(data)) credentials = boto3.Session().get_credentials() service = 'kinesisvideo' region = 'ap-northeast-1' botocore.auth.SigV4Auth(credentials, service, region).add_auth(request)
最後にメインの処理です。カメラで撮影した映像を読み込んで棒人間化、Kinesis Video Streamsまで配信します。
stream_name = <事前に作成したKVSのストリーム名> region = 'ap-northeast-1' data_endpoint = get_data_endpoint(stream_name, region) url = f'{data_endpoint}/putMedia' with tf.Session() as sess, concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor: model_cfg, model_outputs = posenet.load_model(args.model, sess) output_stride = model_cfg['output_stride'] cap = cv2.VideoCapture(0) cap.set(3, args.cam_width) cap.set(4, args.cam_height) cap_fps = 10 while True: out_buffer = io.BytesIO() container = av.open(out_buffer, 'w', 'matroska') stream = container.add_stream('h264', rate=cap_fps) stream.width = args.cam_width stream.height = args.cam_height stream.pix_fmt = 'yuv420p' i = 0 start_ts = time.time() while i < (cap_fps * 9): input_image, display_image, output_scale = posenet.read_cap( cap, scale_factor=args.scale_factor, output_stride=output_stride) heatmaps_result, offsets_result, displacement_fwd_result, displacement_bwd_result = sess.run( model_outputs, feed_dict={'image:0': input_image} ) pose_scores, keypoint_scores, keypoint_coords = posenet.decode_multi.decode_multiple_poses( heatmaps_result.squeeze(axis=0), offsets_result.squeeze(axis=0), displacement_fwd_result.squeeze(axis=0), displacement_bwd_result.squeeze(axis=0), output_stride=output_stride, max_pose_detections=10, min_pose_score=0.15) keypoint_coords *= output_scale skeleton_img = np.zeros(display_image.shape, dtype=np.uint8) skeleton_img = posenet.draw_skeleton( skeleton_img, pose_scores, keypoint_scores, keypoint_coords, min_pose_confidence=0.02, min_part_confidence=0.02) frame = av.VideoFrame.from_ndarray(skeleton_img, format='rgb24') for packet in stream.encode(frame): container.mux(packet) i += 1 # Flush stream for packet in stream.encode(): container.mux(packet) container.close() out_buffer.seek(0) data = out_buffer.read() executor.submit(put_media, stream_name, url, start_ts, data)
メイン処理の中のポイントをいくつか解説していきます。
MKVファイルを作成する処理
PyAVを利用してメモリ上にMKVファイルを作成します。まずはav.open()
でOutputContainer
を作成します。第3引数に matroska
を指定することでマルチメディアコンテナフォーマットがMKV(Matroska Video File)形式になります。
out_buffer = io.BytesIO() container = av.open(out_buffer, 'w', 'matroska')
続いてOutputContainer
にストリームを追加します。Kinesis Video Streamsに送信できるようにコーデックにはh264
を指定します
stream = container.add_stream('h264', rate=cap_fps) stream.width = args.cam_width stream.height = args.cam_height stream.pix_fmt = 'yuv420p'
以後はこのストリームに順次フレームを追加していきます。棒人間画像はNumPyのndarray
形式で取得されているので、av.VideoFrame.from_ndarray
でフレームに変換し、コンテナに追加していきます。
frame = av.VideoFrame.from_ndarray(skeleton_img, format='rgb24') for packet in stream.encode(frame): container.mux(packet)
Kinesis Video Streamsの上限に引っかからない程度にフレームを追加し終わったら、MKVファイルをメモリ上に書き出し&読み出してMKVファイルのバイナリデータをdata
という変数に格納します。
for packet in stream.encode(): container.mux(packet) container.close() out_buffer.seek(0) data = out_buffer.read() executor.submit(put_media, stream_name, url, start_ts, data)
バイナリデータの準備ができたので、別プロセスでKinesis Video StreamsにPutMediaを実行します。
棒人間画像を生成する処理
続いて棒人間画像を生成する部分です。posenet.read_cap
でカメラから画像を読み込み、読み込んだ画像に対してTensorFlowのSession
をrun
し、その結果に対してPoseNet Pythonの decode_multiple_poses
を実行することで骨格検出の結果を取得します。
input_image, display_image, output_scale = posenet.read_cap( cap, scale_factor=args.scale_factor, output_stride=output_stride) heatmaps_result, offsets_result, displacement_fwd_result, displacement_bwd_result = sess.run( model_outputs, feed_dict={'image:0': input_image} ) pose_scores, keypoint_scores, keypoint_coords = posenet.decode_multi.decode_multiple_poses( heatmaps_result.squeeze(axis=0), offsets_result.squeeze(axis=0), displacement_fwd_result.squeeze(axis=0), displacement_bwd_result.squeeze(axis=0), output_stride=output_stride, max_pose_detections=10, min_pose_score=0.15)
最後にnp.zeros
で真っ黒な画像を生成し、検出された骨格のラインをposenet.draw_skeleton
で書き出して棒人間の画像を生成します。あとはこの画像skeleton_img
を先程解説したMKVファイルを作成する処理に渡してKinesis Video Streamsに送信すればOKです
skeleton_img = np.zeros(display_image.shape, dtype=np.uint8) skeleton_img = posenet.draw_skeleton( skeleton_img, pose_scores, keypoint_scores, keypoint_coords, min_pose_confidence=0.02, min_part_confidence=0.02)
実行結果
準備ができたので実行してみましょう
$ python kvs.py
マネコンを監視していると...
棒人間...と言えるレベルではありませんが、棒が表示されました。まあこの辺は各種の閾値の調整だったり被写体の距離やポーズによってもう少し棒人間っぽくなってくれると思います。
Kinesis Video StreamsからダウンロードしたMP4ファイルもちゃんと棒人間?が表示できています
まとめ
バッファしたMKVファイルをPutMediaする度に若干映像に隙間が発生してしまって、ニアリアルタイムなストリーム配信としては微妙なできなのですが、一応は目的が達成できました。Kinesis Video Streamsを業務で利用する場合はプライバシーへの配慮も重要になってくると思います。エッジ側で動画を棒人間化してしまえば個人が識別できない動画としてクラウドに送信できるるので、色々と面倒なことを考えなくて済むようになりそうです。ユースケース次第ではこういった処理も使えるかもしれませんね。